package com.stream

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, WindowedStream}
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

import scala.util.parsing.json.JSON
//import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

/**
 * Hello world!
 *
 */
object App  {
  def main(args: Array[String]): Unit = {
    println( "Hello World!" )
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //-------------------------------------------------kafka
    //开启checkPoint, Time interval between state checkpoints 5000 milliseconds.
    /**
      * 如果我们启用了Flink的Checkpint机制，
      * 那么Flink Kafka Consumer将会从指定的Topic中消费消息，
      * 然后定期地将Kafka offsets信息、状态信息以及其他的操作信息进行Checkpint。
      * 所以，如果Flink作业出故障了，Flink将会从最新的Checkpint中恢复，
      * 并且从上一次偏移量开始读取Kafka中消费消息。
      */
    env.enableCheckpointing(5000)
    //设置系统基本时间特性为事件时间
    // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

//    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    val props = new Properties()
    props.setProperty("zookeeper.connect", "192.168.86.101:2181,192.168.86.102:2181,192.168.86.103:2181")
    props.setProperty("bootstrap.servers","192.168.86.101:9092")
    props.setProperty("group.id","stream_flink_test")
    val consumer = new FlinkKafkaConsumer010[String]("textTopic",new  SimpleStringSchema(),props)
    val dataStream =
            env.addSource(consumer)
                .flatMap(_.split(","))
                .map((_,1))
                .keyBy(0)
                .timeWindow(Time.seconds(5),Time.seconds(2))
                .sum(1)
    dataStream.print()
    env.execute("Flink ")

//    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
//    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

//    val tr = env.addSource(new FlinkKafkaConsumer081[String]("textTopic",new SimpleStringSchema(),props))

//      val consumer = new FlinkKafkaConsumer010[String]("flink_topic",new SimpleStringSchema(),props)
//      consumer.setStartFromLatest()
//      val source = env.addSource(consumer)

//    val dataStream: DataStream[String] = source
//      dataStream.print()
//    env.execute("Flink ")
//    println( "See you next time!" )

    //-------------------------------------------------kafka  22


  }
}
